home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import scanext
- import threading
- import time
- import os
- import Queue
- from base.g import *
- from base import utils
- EVENT_SCAN_CANCELED = 1
- TYPE_STR = {
- scanext.TYPE_BOOL: 'TYPE_BOOL',
- scanext.TYPE_INT: 'TYPE_INT',
- scanext.TYPE_FIXED: 'TYPE_FIXED',
- scanext.TYPE_STRING: 'TYPE_STRING',
- scanext.TYPE_BUTTON: 'TYPE_BUTTON',
- scanext.TYPE_GROUP: 'TYPE_GROUP' }
- UNIT_STR = {
- scanext.UNIT_NONE: 'UNIT_NONE',
- scanext.UNIT_PIXEL: 'UNIT_PIXEL',
- scanext.UNIT_BIT: 'UNIT_BIT',
- scanext.UNIT_MM: 'UNIT_MM',
- scanext.UNIT_DPI: 'UNIT_DPI',
- scanext.UNIT_PERCENT: 'UNIT_PERCENT',
- scanext.UNIT_MICROSECOND: 'UNIT_MICROSECOND' }
-
- class Option:
- '''Class representing a SANE option.
- Attributes:
- index -- number from 0 to n, giving the option number
- name -- a string uniquely identifying the option
- title -- single-line string containing a title for the option
- desc -- a long string describing the option; useful as a help message
- type -- type of this option. Possible values: TYPE_BOOL,
- TYPE_INT, TYPE_STRING, and so forth.
- unit -- units of this option. Possible values: UNIT_NONE,
- UNIT_PIXEL, etc.
- size -- size of the value in bytes
- cap -- capabilities available; CAP_EMULATED, CAP_SOFT_SELECT, etc.
- constraint -- constraint on values. Possible values:
- None : No constraint
- (min,max,step) Integer values, from min to max, stepping by
- list of integers or strings: only the listed values are allowed
- '''
-
- def __init__(self, args, cur_device):
- import string
- self.cur_device = cur_device
- (self.index, self.name, self.title, self.desc, self.type, self.unit, self.size, self.cap, self.constraint) = args
- if type(self.name) != type(''):
- self.name = str(self.name)
-
-
-
- def isActive(self):
- return scanext.isOptionActive(self.cap)
-
-
- def isSettable(self):
- return scanext.isOptionSettable(self.cap)
-
-
- def __repr__(self):
- if self.isSettable():
- settable = 'yes'
- else:
- settable = 'no'
- if self.isActive():
- active = 'yes'
- curValue = self.cur_device.getOption(self.name)
- else:
- active = 'no'
- curValue = '<not available, inactive option>'
- return '\nName: %s\nCur value: %s\nIndex: %d\nTitle: %s\nDesc: %s\nType: %s\nUnit: %s\nConstr: %s\nisActive: %s\nisSettable: %s\n' % (self.name, curValue, self.index, self.title, self.desc, TYPE_STR[self.type], UNIT_STR[self.unit], self.constraint, active, settable)
-
-
- def limitAndSet(self, value):
- if value is not None and self.constraint is not None:
- if type(self.constraint) == type(()):
- if value < self.constraint[0]:
- value = self.constraint[0]
- log.warn('Invalid value for %s (%s < min value of %d). Using %d.' % (self.name, self.name, value, value))
- elif value > self.constraint[1]:
- value = self.constraint[1]
- log.warn('Invalid value for %s (%s > max value of %d). Using %d.' % (self.name, self.name, value, value))
-
- self.cur_device.setOption(self.name, value)
- elif type(self.constraint) == type([]):
- if value not in self.constraint:
- v = self.constraint[0]
- min_dist = sys.maxint
- for x in self.constraint:
- if abs(value - x) < min_dist:
- min_dist = abs(value - x)
- v = x
- continue
-
- log.warn('Invalid value for %s (%s not in constraint list: %s). Using %d.' % (self.name, self.name, value, ', '.join(self.constraint), v))
- self.cur_device.setOption(self.name, v)
-
-
- else:
- value = self.cur_device.getOption(self.name)
- return value
-
-
-
- class ScanDevice:
- """Class representing a SANE device.
- Methods:
- startScan() -- initiate a scan, using the current settings
- cancelScan() -- cancel an in-progress scanning operation
-
- Also available, but rather low-level:
- getParameters() -- get the current parameter settings of the device
- getOptions() -- return a list of tuples describing all the options.
-
- Attributes:
- optlist -- list of option names
-
- You can also access an option name to retrieve its value, and to
- set it. For example, if one option has a .name attribute of
- imagemode, and scanner is a ScanDevice object, you can do:
- print scanner.imagemode
- scanner.imagemode = 'Full frame'
- scanner.['imagemode'] returns the corresponding Option object.
- """
-
- def __init__(self, dev):
- self.scan_thread = None
- self.dev = scanext.openDevice(dev)
- self.options = { }
- self._ScanDevice__load_options_dict()
-
-
- def __load_options_dict(self):
- opts = self.options
- opt_list = self.dev.getOptions()
- for t in opt_list:
- o = Option(t, self)
- if o.type != scanext.TYPE_GROUP:
- opts[o.name] = o
- continue
-
-
-
- def setOption(self, key, value):
- opts = self.options
- if key not in opts:
- opts[key] = value
- return None
- opt = opts[key]
- if opt.type == scanext.TYPE_GROUP:
- log.error("Groups can't be set: %s" % key)
-
- if not scanext.isOptionActive(opt.cap):
- log.error('Inactive option: %s' % key)
-
- if not scanext.isOptionSettable(opt.cap):
- log.error("Option can't be set by software: %s" % key)
-
- if type(value) == int and opt.type == scanext.TYPE_FIXED:
- value = float(value)
-
-
- try:
- self.last_opt = self.dev.setOption(opt.index, value)
- except scanext.error:
- log.error('Unable to set option %s to value %s' % (key, value))
-
- if self.last_opt & scanext.INFO_RELOAD_OPTIONS:
- self._ScanDevice__load_options_dict()
-
-
-
- def getOption(self, key):
- opts = self.options
- if key == 'optlist':
- return opts.keys()
- if key == 'area':
- return ((opts['tl-x'], opts['tl-y']), (opts['br-x'], opts['br-y']))
- if key not in opts:
- raise AttributeError, 'No such attribute: %s' % key
- key not in opts
- opt = opts[key]
- if opt.type == scanext.TYPE_BUTTON:
- raise AttributeError, "Buttons don't have values: %s" % key
- opt.type == scanext.TYPE_BUTTON
- if opt.type == scanext.TYPE_GROUP:
- raise AttributeError, "Groups don't have values: %s " % key
- opt.type == scanext.TYPE_GROUP
- if not scanext.isOptionActive(opt.cap):
- raise AttributeError, 'Inactive option: %s' % key
- scanext.isOptionActive(opt.cap)
- return self.dev.getOption(opt.index)
-
-
- def getOptionObj(self, key):
- opts = self.options
- if key in opts:
- return opts[key]
-
-
- def getParameters(self):
- """Return a 6-tuple holding all the current device settings:
- (format, format_name, last_frame, (pixels_per_line, lines), depth, bytes_per_line)
-
- - format is the SANE frame type
- - format is one of 'grey', 'color' (RGB), 'red', 'green', 'blue'.
- - last_frame [bool] indicates if this is the last frame of a multi frame image
- - (pixels_per_line, lines) specifies the size of the scanned image (x,y)
- - lines denotes the number of scanlines per frame
- - depth gives number of pixels per sample
- """
- return self.dev.getParameters()
-
-
- def getOptions(self):
- '''Return a list of tuples describing all the available options'''
- return self.dev.getOptions()
-
-
- def startScan(self, byte_format = 'BGRA', update_queue = None, event_queue = None):
- '''
- Perform a scan with the current device.
- Calls sane_start().
- '''
- if not self.isScanActive():
- status = self.dev.startScan()
- (self.format, self.format_name, self.last_frame, self.pixels_per_line, self.lines, self.depth, self.bytes_per_line) = self.dev.getParameters()
- self.scan_thread = ScanThread(self.dev, byte_format, update_queue, event_queue)
- self.scan_thread.scan_active = True
- self.scan_thread.start()
- return (True, self.lines * self.bytes_per_line, status)
- return (False, 0, scanext.SANE_STATUS_DEVICE_BUSY)
-
-
- def cancelScan(self):
- '''Cancel an in-progress scanning operation.'''
- return self.dev.cancelScan()
-
-
- def getScan(self):
- '''Get the output buffer and info about a completed scan.'''
- if not self.isScanActive():
- s = self.scan_thread
- return (s.buffer, s.format, s.format_name, s.pixels_per_line, s.lines, s.depth, s.bytes_per_line, s.pad_bytes, s.total_read)
-
-
- def freeScan(self):
- '''Cleanup the scan file after a completed scan.'''
- if not self.isScanActive():
- s = self.scan_thread
-
- try:
- s.buffer.close()
- os.remove(s.buffer_path)
- except (IOError, AttributeError):
- pass
- except:
- None<EXCEPTION MATCH>(IOError, AttributeError)
-
-
- None<EXCEPTION MATCH>(IOError, AttributeError)
-
-
- def isScanActive(self):
- if self.scan_thread is not None:
- if self.scan_thread.isAlive():
- pass
- return self.scan_thread.scan_active
- return False
-
-
- def waitForScanDone(self):
- if self.scan_thread is not None and self.scan_thread.isAlive() and self.scan_thread.scan_active:
-
- try:
- self.scan_thread.join()
- except KeyboardInterrupt:
- pass
- except:
- None<EXCEPTION MATCH>KeyboardInterrupt
-
-
- None<EXCEPTION MATCH>KeyboardInterrupt
-
-
- def waitForScanActive(self):
- time.sleep(0.5)
- if self.scan_thread is not None:
- while True:
- if self.scan_thread.isAlive() and self.scan_thread.scan_active:
- return None
- time.sleep(0.5)
- continue
- self.scan_thread.scan_active
-
-
-
- def closeScan(self):
- '''Close the SANE device after a scan.'''
- self.dev.closeScan()
-
-
-
- class ScanThread(threading.Thread):
-
- def __init__(self, device, byte_format = 'BGRA', update_queue = None, event_queue = None):
- threading.Thread.__init__(self)
- self.scan_active = True
- self.dev = device
- self.update_queue = update_queue
- self.event_queue = event_queue
- (self.buffer_fd, self.buffer_path) = utils.make_temp_file(prefix = 'hpscan')
- self.buffer = os.fdopen(self.buffer_fd, 'w+b')
- self.format = -1
- self.format_name = ''
- self.last_frame = -1
- self.pixels_per_line = -1
- self.lines = -1
- self.depth = -1
- self.bytes_per_line = -1
- self.pad_bytes = -1
- self.total_read = 0
- self.byte_format = byte_format
-
-
- def updateQueue(self, status, bytes_read):
- if self.update_queue is not None:
-
- try:
- status = int(status)
- except (ValueError, TypeError):
- status = -1
-
- self.update_queue.put((status, bytes_read))
- time.sleep(0)
-
-
-
- def run(self):
- (self.format, self.format_name, self.last_frame, self.pixels_per_line, self.lines, self.depth, self.bytes_per_line) = self.dev.getParameters()
- log.debug('format=%d' % self.format)
- log.debug('format_name=%s' % self.format_name)
- log.debug('last_frame=%d' % self.last_frame)
- log.debug('ppl=%d' % self.pixels_per_line)
- log.debug('lines=%d' % self.lines)
- log.debug('depth=%d' % self.depth)
- log.debug('bpl=%d' % self.bytes_per_line)
- log.debug('byte_format=%s' % self.byte_format)
- w = self.buffer.write
- if self.format == scanext.FRAME_RGB:
- if self.depth == 8:
- self.pad_bytes = self.bytes_per_line - 3 * self.pixels_per_line
- log.debug('pad_bytes=%d' % self.pad_bytes)
- dir = -1
- if self.byte_format == 'RGBA':
- dir = 1
-
-
- try:
- (st, t) = self.dev.readScan(self.bytes_per_line)
- except scanext.error:
- st = None
- self.updateQueue(st, 0)
-
- while st == scanext.SANE_STATUS_GOOD:
- if t:
- index = 0
- while index < len(t) - self.pad_bytes:
- w(t[index:index + 3:dir])
- w('\xff')
- index += 3
- self.total_read += len(t)
- self.updateQueue(st, self.total_read)
- log.debug('Read %d bytes' % self.total_read)
- else:
- time.sleep(0.1)
-
- try:
- (st, t) = self.dev.readScan(self.bytes_per_line)
- except scanext.error:
- st = None
- self.updateQueue(st, self.total_read)
- break
-
- if self.checkCancel():
- break
- continue
-
- elif self.format == scanext.FRAME_GRAY:
- if self.depth == 1:
- self.pad_bytes = self.bytes_per_line - (self.pixels_per_line + 7) // 8
- log.debug('pad_bytes=%d' % self.pad_bytes)
-
- try:
- (st, t) = self.dev.readScan(self.bytes_per_line)
- except scanext.error:
- st = None
- self.updateQueue(st, 0)
-
- while st == scanext.SANE_STATUS_GOOD:
- if t:
- index = 0
- while index < len(t) - self.pad_bytes:
- k = 128
- j = ord(t[index])
- for b in range(8):
- if k & j:
- w('\x00\x00\x00\xff')
- else:
- w('\xff\xff\xff\xff')
- k = k >> 1
-
- index += 1
- self.total_read += len(t)
- self.updateQueue(st, self.total_read)
- log.debug('Read %d bytes' % self.total_read)
- else:
- time.sleep(0.1)
-
- try:
- (st, t) = self.dev.readScan(self.bytes_per_line)
- except scanext.error:
- st = None
- self.updateQueue(st, self.total_read)
- break
-
- if self.checkCancel():
- break
- continue
- elif self.depth == 8:
- self.pad_bytes = self.bytes_per_line - self.pixels_per_line
- log.debug('pad_bytes=%d' % self.pad_bytes)
-
- try:
- (st, t) = self.dev.readScan(self.bytes_per_line)
- except scanext.error:
- st = None
- self.updateQueue(st, 0)
-
- while st == scanext.SANE_STATUS_GOOD:
- if t:
- index = 0
- while index < len(t) - self.pad_bytes:
- j = t[index]
- w(j)
- w(j)
- w(j)
- w('\xff')
- index += 1
- self.total_read += len(t)
- self.updateQueue(st, self.total_read)
- log.debug('Read %d bytes' % self.total_read)
- else:
- time.sleep(0.1)
-
- try:
- (st, t) = self.dev.readScan(self.bytes_per_line)
- except scanext.error:
- st = None
- self.updateQueue(st, self.total_read)
- break
-
- if self.checkCancel():
- break
- continue
-
-
- self.buffer.seek(0)
- self.scan_active = False
- log.debug('Scan thread exiting...')
-
-
- def checkCancel(self):
- canceled = False
- while self.event_queue.qsize():
-
- try:
- event = self.event_queue.get(0)
- if event == EVENT_SCAN_CANCELED:
- canceled = True
- log.debug('Cancel pressed!')
- self.dev.cancelScan()
- continue
- except Queue.Empty:
- break
- continue
-
-
- None<EXCEPTION MATCH>Queue.Empty
- return canceled
-
-
-
- def init():
- return scanext.init()
-
-
- def deInit():
- return scanext.deInit()
-
-
- def openDevice(dev):
- '''Open a device for scanning'''
- return ScanDevice(dev)
-
-
- def getDevices(local_only = 0):
- return scanext.getDevices(local_only)
-
-
- def reportError(code):
- log.error('SANE: %s (code=%d)' % (scanext.getErrorMessage(code), code))
-
-